of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
Pexpect Copyright (c) 2006 Noah Spurrier
http://pexpect.sourceforge.net/
$Revision: 1.2 $
$Date: 2007/01/11 20:51:46 $
'''
try:
import os
import sys
import time
import select
import string
import re
import struct
import resource
import types
import pty
import tty
import termios
import fcntl
import errno
import traceback
import signal
except ImportError:
e = None
raise ImportError(str(e) + '\nA critical module was not found. Probably this operating system does not support it.\nPexpect is intended for UNIX-like operating systems.')
__version__ = '2.1'
__revision__ = '$Revision: 1.2 $'
__all__ = [
'ExceptionPexpect',
'EOF',
'TIMEOUT',
'spawn',
'run',
'which',
'split_command_line',
'__version__',
'__revision__']
class ExceptionPexpect(Exception):
'''Base class for all exceptions raised by this module.
'''
def __init__(self, value):
self.value = value
def __str__(self):
return str(self.value)
def get_trace(self):
'''This returns an abbreviated stack trace with lines that only concern the caller.
In other words, the stack trace inside the Pexpect module is not included.
self.use_native_pty_fork = not (sys.platform.lower().find('solaris') >= 0)
if command is None:
self.command = None
self.args = None
self.name = '<pexpect factory incomplete>'
return None
if type(command) == type(0):
raise ExceptionPexpect('Command is an int type. If this is a file descriptor then maybe you want to use fdpexpect.fdspawn which takes an existing file descriptor instead of a command string.')
if type(args) != type([]):
raise TypeError('The argument, args, must be a list.')
if args == []:
self.args = split_command_line(command)
self.command = self.args[0]
else:
self.args = args[:]
self.args.insert(0, command)
self.command = command
command_with_path = which(self.command)
if command_with_path is None:
raise ExceptionPexpect('The command was not found or was not executable: %s.' % self.command)
self.command = command_with_path
self.args[0] = self.command
self.name = '<' + ' '.join(self.args) + '>'
self._spawn__spawn()
def __del__(self):
'''This makes sure that no system resources are left open.
Python only garbage collects Python objects. OS file descriptors
are not Python objects, so they must be handled explicitly.
If the child file descriptor was opened outside of this class
(passed to the constructor) then this does not close it.
'''
if not self.closed:
self.close()
def __str__(self):
'''This returns the current state of the pexpect object as a string.
'''This reads at most size characters from the child application.
It includes a timeout. If the read does not complete within the
timeout period then a TIMEOUT exception is raised.
If the end of file is read then an EOF exception will be raised.
If a log file was set using setlog() then all data will
also be written to the log file.
If timeout==None then the read may block indefinitely.
If timeout==-1 then the self.timeout value is used.
If timeout==0 then the child is polled and
if there was no data immediately ready then this will raise a TIMEOUT exception.
The "timeout" refers only to the amount of time to read at least one character.
This is not effected by the \'size\' parameter, so if you call
read_nonblocking(size=100, timeout=30) and only one character is
available right away then one character will be returned immediately.
It will not wait for 30 seconds for another 99 characters to come in.
This is a wrapper around os.read().
It uses select.select() to implement a timeout.
'''
if self.closed:
raise ValueError('I/O operation on closed file in read_nonblocking().')
if timeout == -1:
timeout = self.timeout
if not self.isalive():
(r, w, e) = self._spawn__select([
self.child_fd], [], [], 0)
if not r:
self.flag_eof = True
raise EOF('End Of File (EOF) in read_nonblocking(). Braindead platform.')
elif self._spawn__irix_hack:
(r, w, e) = self._spawn__select([
self.child_fd], [], [], 2)
if not r and not self.isalive():
self.flag_eof = True
raise EOF('End Of File (EOF) in read_nonblocking(). Pokey platform.')
(r, w, e) = self._spawn__select([
self.child_fd], [], [], timeout)
if not r:
if not self.isalive():
self.flag_eof = True
raise EOF('End of File (EOF) in read_nonblocking(). Very pokey platform.')
else:
raise TIMEOUT('Timeout exceeded in read_nonblocking().')
if self.child_fd in r:
try:
s = os.read(self.child_fd, size)
except OSError:
e = None
self.flag_eof = True
raise EOF('End Of File (EOF) in read_nonblocking(). Exception style platform.')
if s == '':
self.flag_eof = True
raise EOF('End Of File (EOF) in read_nonblocking(). Empty string style platform.')
if self.logfile is not None:
self.logfile.write(s)
self.logfile.flush()
return s
raise ExceptionPexpect('Reached an unexpected state in read_nonblocking().')
def read(self, size = -1):
'''This reads at most "size" bytes from the file
(less if the read hits EOF before obtaining size bytes).
If the size argument is negative or omitted,
read all data until EOF is reached.
The bytes are returned as a string object.
An empty string is returned when EOF is encountered immediately.
'''
if size == 0:
return ''
if size < 0:
self.expect(self.delimiter)
return self.before
cre = re.compile('.{%d}' % size, re.DOTALL)
index = self.expect([
cre,
self.delimiter])
if index == 0:
return self.after
return self.before
def readline(self, size = -1):
'''This reads and returns one entire line. A trailing newline is kept in
the string, but may be absent when a file ends with an incomplete line.
Note: This readline() looks for a \\r\\n pair even on UNIX because
this is what the pseudo tty device returns. So contrary to what you
may expect you will receive the newline as \\r\\n.
An empty string is returned when EOF is hit immediately.
Currently, the size agument is mostly ignored, so this behavior is not
standard for a file-like object. If size is 0 then an empty string
is returned.
'''
if size == 0:
return ''
index = self.expect([
'\r\n',
self.delimiter])
if index == 0:
return self.before + '\r\n'
else:
return self.before
def __iter__(self):
'''This is to support iterators over a file-like object.
'''
return self
def next(self):
'''This is to support iterators over a file-like object.
'''
result = self.readline()
if result == '':
raise StopIteration
return result
def readlines(self, sizehint = -1):
'''This reads until EOF using readline() and returns a list containing
the lines thus read. The optional "sizehint" argument is ignored.
'''
lines = []
while True:
line = self.readline()
if not line:
break
lines.append(line)
return lines
def write(self, str):
'''This is similar to send() except that there is no return value.
'''
self.send(str)
def writelines(self, sequence):
'''This calls write() for each element in the sequence.
The sequence can be any iterable object producing strings,
typically a list of strings. This does not add line separators
There is no return value.
'''
for str in sequence:
self.write(str)
def send(self, str):
'''This sends a string to the child process.
This returns the number of bytes written.
If a log file was set then the data is also written to the log.
'''
time.sleep(self.delaybeforesend)
if self.logfile is not None:
self.logfile.write(str)
self.logfile.flush()
c = os.write(self.child_fd, str)
return c
def sendline(self, str = ''):
'''This is like send(), but it adds a line feed (os.linesep).
This returns the number of bytes written.
'''
n = self.send(str)
n = n + self.send(os.linesep)
return n
def sendeof(self):
'''This sends an EOF to the child.
This sends a character which causes the pending parent output
buffer to be sent to the waiting child program without
waiting for end-of-line. If it is the first character of the
line, the read() in the user program returns 0, which
signifies end-of-file. This means to work as expected
a sendeof() has to be called at the begining of a line.
This method does not send a newline. It is the responsibility
of the caller to ensure the eof is sent at the beginning of a line.
'''
fd = sys.stdin.fileno()
old = termios.tcgetattr(fd)
new = termios.tcgetattr(fd)
new[3] = new[3] | termios.ICANON
try:
termios.tcsetattr(fd, termios.TCSADRAIN, new)
if 'CEOF' in dir(termios):
os.write(self.child_fd, '%c' % termios.CEOF)
else:
os.write(self.child_fd, '\x04')
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old)
def eof(self):
'''This returns True if the EOF exception was ever raised.
'''
return self.flag_eof
def terminate(self, force = False):
'''This forces a child process to terminate.
It starts nicely with SIGHUP and SIGINT. If "force" is True then
moves onto SIGKILL.
This returns True if the child was terminated.
This returns False if the child could not be terminated.
'''
if not self.isalive():
return True
self.kill(signal.SIGHUP)
time.sleep(self.delayafterterminate)
if not self.isalive():
return True
self.kill(signal.SIGCONT)
time.sleep(self.delayafterterminate)
if not self.isalive():
return True
self.kill(signal.SIGINT)
time.sleep(self.delayafterterminate)
if not self.isalive():
return True
if force:
self.kill(signal.SIGKILL)
time.sleep(self.delayafterterminate)
if not self.isalive():
return True
else:
return False
return False
def wait(self):
'''This waits until the child exits. This is a blocking call.
This will not read any data from the child, so this will block forever
if the child has unread output and has terminated. In other words, the child
may have printed output then called exit(); but, technically, the child is
still alive until its output is read.
'''
if self.isalive():
(pid, status) = os.waitpid(self.pid, 0)
else:
raise ExceptionPexpect('Cannot wait for dead child process.')
self.exitstatus = os.WEXITSTATUS(status)
if os.WIFEXITED(status):
self.status = status
self.exitstatus = os.WEXITSTATUS(status)
self.signalstatus = None
self.terminated = True
elif os.WIFSIGNALED(status):
self.status = status
self.exitstatus = None
self.signalstatus = os.WTERMSIG(status)
self.terminated = True
elif os.WIFSTOPPED(status):
raise ExceptionPexpect('Wait was called for a child process that is stopped. This is not supported. Is some other process attempting job control with our child pid?')
return self.exitstatus
def isalive(self):
'''This tests if the child process is running or not.
This is non-blocking. If the child was terminated then this
will read the exitstatus or signalstatus of the child.
This returns True if the child process appears to be running or False if not.
It can take literally SECONDS for Solaris to return the right status.
raise ExceptionPexpect('isalive() encountered condition where "terminated" is 0, but there was no child process. Did someone else call waitpid() on our process?')
raise ExceptionPexpect('isalive() encountered condition that should never happen. There was no child process. Did someone else call waitpid() on our process?')
else:
raise e
except:
e[0] == errno.ECHILD
if pid == 0:
return True
if pid == 0:
return True
if os.WIFEXITED(status):
self.status = status
self.exitstatus = os.WEXITSTATUS(status)
self.signalstatus = None
self.terminated = True
elif os.WIFSIGNALED(status):
self.status = status
self.exitstatus = None
self.signalstatus = os.WTERMSIG(status)
self.terminated = True
elif os.WIFSTOPPED(status):
raise ExceptionPexpect('isalive() encountered condition where child process is stopped. This is not supported. Is some other process attempting job control with our child pid?')
return False
def kill(self, sig):
'''This sends the given signal to the child application.
In keeping with UNIX tradition it has a misleading name.
It does not necessarily kill the child unless
you send the right signal.
'''
if self.isalive():
os.kill(self.pid, sig)
def compile_pattern_list(self, patterns):
'''This compiles a pattern-string or a list of pattern-strings.
Patterns must be a StringType, EOF, TIMEOUT, SRE_Pattern, or
a list of those. Patterns may also be None which results in
an empty list.
This is used by expect() when calling expect_list().
Thus expect() is nothing more than::
cpl = self.compile_pattern_list(pl)
return self.expect_list(clp, timeout)
If you are using expect() within a loop it may be more
efficient to compile the patterns first and then call expect_list().
This avoid calls in a loop to compile_pattern_list():